package com.dahuan.window;

import com.dahuan.bean.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;


public class Window_WindowFunction {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //从端口中获取数据
        DataStreamSource<String> inputStream = env.socketTextStream( "localhost", 5555 );
        //转换
        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );
        //分组
        DataStream<Tuple3<String, Long, Integer>> resultStream = dataStream.keyBy( "id" )
                .timeWindow( Time.seconds( 5 ) )
                //TODO 全窗口函数
                .apply( new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
                        //拿到id
                        String id = tuple.getField( 0 );
                        //拿到结束时间
                        Long windowEnd = window.getEnd();
                        //TODO 拿到输入的个数 (迭代装饰器)
                        Integer count = IteratorUtils.toList( input.iterator() ).size();

                        out.collect( new Tuple3<>( id, windowEnd, count ) );
                    }
                } );

        resultStream.print();
        env.execute("Window_WindowFunction");

    }
}
